package org.jetbrains.exposed.v1.r2dbc.statements

import io.r2dbc.spi.Connection
import io.r2dbc.spi.Statement
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.reactive.awaitFirstOrNull
import org.jetbrains.exposed.v1.core.ArrayColumnType
import org.jetbrains.exposed.v1.core.BinaryColumnType
import org.jetbrains.exposed.v1.core.BlobColumnType
import org.jetbrains.exposed.v1.core.IColumnType
import org.jetbrains.exposed.v1.core.InternalApi
import org.jetbrains.exposed.v1.core.VarCharColumnType
import org.jetbrains.exposed.v1.core.statements.StatementResult
import org.jetbrains.exposed.v1.core.vendors.DatabaseDialect
import org.jetbrains.exposed.v1.r2dbc.mappers.R2dbcTypeMapping
import org.jetbrains.exposed.v1.r2dbc.statements.api.R2dbcPreparedStatementApi
import org.jetbrains.exposed.v1.r2dbc.statements.api.R2dbcResult
import java.io.InputStream
import java.time.Duration

/**
 * Class representing a precompiled SQL [Statement] from the R2DBC SPI.
 *
 * The result row generated by executing this statement contains auto-generated keys based on the value of
 * [wasGeneratedKeysRequested].
 */
class R2dbcPreparedStatementImpl(
    private val statement: Statement,
    // the property below is only here for setTimeout() --> should this logic be in R2dbcConnectionImpl instead
    val connection: Connection,
    val wasGeneratedKeysRequested: Boolean,
    private val currentDialect: DatabaseDialect,
    private val typeMapping: R2dbcTypeMapping
) : R2dbcPreparedStatementApi {
    private var resultRow: R2dbcResult? = null

    override suspend fun getResultRow(): R2dbcResult? {
        if (resultRow == null && wasGeneratedKeysRequested) {
            val resultPublisher = statement.execute()
            resultRow = R2dbcResult(resultPublisher, typeMapping)
        }

        return resultRow
    }

    override suspend fun setFetchSize(value: Int?) {
        value?.let { statement.fetchSize(value) }
    }

    override suspend fun setTimeout(value: Int?) {
        value?.let {
            connection.setStatementTimeout(Duration.ofSeconds(value.toLong())).awaitFirstOrNull()
        }
    }

    override suspend fun addBatch() {
        statement.add()
    }

    override suspend fun executeQuery(): R2dbcResult = R2dbcResult(statement.execute(), typeMapping)

    override suspend fun executeUpdate() {
        val result = statement.execute()
        val r2dbcResult = R2dbcResult(result, typeMapping)
        resultRow = r2dbcResult
    }

    override suspend fun executeMultiple(): List<StatementResult> {
        val result = statement.execute()
        val r2dbcResult = R2dbcResult(result, typeMapping)
        return listOf(StatementResult.Object(r2dbcResult))
        // full JDBC logic does not seem possible here
//        return if (statement.execute()) {
//            listOf(StatementResult.Object(JdbcResult(statement.resultSet)))
//        } else {
//            // getMoreResults() returns true only if next result is a ResultSet
//            while (!statement.getMoreResults(Statement.CLOSE_CURRENT_RESULT)) {
//                if (statement.updateCount == -1) return emptyList()
//            }
//            listOf(StatementResult.Object(JdbcResult(statement.resultSet)))
//        }
    }

    @Deprecated(
        message = "This operator function will be removed in future releases. " +
            "Replace with the method `set(index, value, this)` that accepts a third argument for the IColumnType of the parameter value being bound.",
        level = DeprecationLevel.ERROR
    )
    override fun set(index: Int, value: Any) {
        set(index, value, VarCharColumnType())
    }

    override fun set(index: Int, value: Any, columnType: IColumnType<*>) {
        // Try to use the type mappers first
        if (typeMapping.setValue(statement, currentDialect, columnType, value, index)) {
            return
        }

        throw IllegalArgumentException("Unsupported value type: ${value::class.qualifiedName}")
    }

    override fun setNull(index: Int, columnType: IColumnType<*>) {
        // Try to use the type mappers first
        if (typeMapping.setValue(statement, currentDialect, columnType, null, index)) {
            return
        }

        throw IllegalArgumentException("Unsupported column type for null value: ${columnType::class.qualifiedName}")
    }

    override fun setInputStream(index: Int, inputStream: InputStream, setAsBlobObject: Boolean) {
        val columnType = if (setAsBlobObject) BlobColumnType() else BinaryColumnType(Int.MAX_VALUE)
        if (typeMapping.setValue(statement, currentDialect, columnType, inputStream, index)) {
            return
        }

        throw IllegalArgumentException("Unsupported InputStream for column type: ${columnType::class.qualifiedName}")
    }

    @Deprecated(
        message = "This function will be removed in future releases. " +
            "Replace with the method `setArray(index, this, array)` that accepts an ArrayColumnType as the second argument instead of a string type representation.",
        level = DeprecationLevel.ERROR
    )
    override fun setArray(index: Int, type: String, array: Array<*>) {
        @OptIn(InternalApi::class)
        setArray(index, getArrayColumnType(type), array)
    }

    override fun setArray(index: Int, type: ArrayColumnType<*, *>, array: Array<*>) {
        // Try to use the type mappers first
        if (typeMapping.setValue(statement, currentDialect, type, array, index)) {
            return
        }

        throw IllegalArgumentException("Unsupported array type: ${type::class.qualifiedName}")
    }

    override suspend fun closeIfPossible() {
        // do nothing
    }

    override suspend fun executeBatch(): List<Int> {
        val result = statement.execute()
        val r2dbcResult = R2dbcResult(result, typeMapping)

        return if (wasGeneratedKeysRequested) {
            resultRow = r2dbcResult
            emptyList()
        } else {
            resultRow = null
            r2dbcResult.rowsUpdated().toList()
        }
    }

    override suspend fun cancel() {
        // do nothing
    }
}
